
# This script takes user-year work vectors for clustering created by wp-health/src/python/week8/kmeansVectors.py and formats
# them for kmeans++ on graphlab




import os
import sys
import multiprocessing
import datetime
import traceback
import math

import wp_datetime as dt

def main():

	# The number of worker processes the worker pool will use to process the work queue.
	# Recommend setting this to half the number of available cores to avoid hogging resources.
	# That would be 24 cores on platinum.
	# Once you get above a certain level of CPU usage, I/O throughput will bottleneck anyway.
	NUM_WORKERS = 24
	
	# The number of translation tasks given to each process at a time.
	# Recommend this setting to be 1 for maximum throughput.
	# If I/O throughput ever becomes an issue, increasing this value will make non-CPU bound tasks run more efficiently.
	TASK_PARTITION_SIZE = 1
	
	
	inputDir = sys.argv[1]
	outputDir = sys.argv[2]

	inputFileList = os.listdir(inputDir)

	filePathArgs = []
	
	for inputFileName in inputFileList:
		
		inputFilePath = inputDir+'/'+inputFileName
		outputFilePath = outputDir+'/'+inputFileName
		
		if os.path.isfile(inputFilePath):
			filePathArgs.append( (inputFilePath, outputFilePath) )

	workerPool = multiprocessing.Pool(NUM_WORKERS)
	workerPool.map_async(processFile, filePathArgs, TASK_PARTITION_SIZE)
	workerPool.close()
	workerPool.join()
			
def processFile(filePathArgs):
	
	try:
		inputFilePath = filePathArgs[0]
		outputFilePath = filePathArgs[1]

		print timestamp() + " Starting processing for " + inputFilePath + " to " + outputFilePath

		inputFileHandle = open(inputFilePath,"r")
		outputFileHandle = open(outputFilePath, 'a+')

		translate(inputFileHandle, outputFileHandle)
	
		inputFileHandle.close()
		outputFileHandle.close()

		print timestamp() + " Finished processing for " + inputFilePath + " in " + outputFilePath
	
	except:
		# There is a problem where if a child process encounters an error or exception, the traceback gets written
		# to stderr but not flushed, so you never see it. This code fixes that problem by pushing it to stdout, where it is 
		# piped back to the main process through the pool.
		traceback.print_exc(limit=3, file=sys.stdout)
		sys.stdout.flush()

def translate(inputFileHandle, outputFileHandle):

	for line in inputFileHandle:
		
		fields = line.strip().split('\t')

		startoffset = 5

		norm = 0.0
		for index in range(startoffset, len(fields)):	
			norm += float(fields[index])**2

		norm = math.sqrt(norm)

		output = []
		for index in range(startoffset, len(fields)):
			if norm != 0:
				output.append(str(float(fields[index]) / norm))
			else:
				output.append('0.0')

		outputFileHandle.write('\t'.join(output)+'\n')

def timestamp():
	
	return datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

# Protects against the script being loaded and run recursively in the child process
# Did not seem to be a problem, but better safe than sorry
if __name__ == '__main__':
	main()
